On Chunks - The Art of Dask Part 1

In this notebook we'll be exploring the impact of chunking choices for dask arrays. We'll use an ODC example but this isn't specific to ODC, it applies to all usage of dask Arrays. Chunking choices have a significant impact on performance for three reasons:

  1. Chunks are the unit of work during processing
  2. Chunks are the unit of transport in communicating information between workers
  3. Chunks are directly related to the number of tasks being executed

Performance is thus impacted in multiple ways - this is all about tradeoffs:

  • if chunks are too small, there will be too many tasks and processing may be inefficient BUT
  • if chunks are too big, communication may be too long and the combined total of all chunks required for a calculation may exceed worker memory causing spilling to disk or worse, workers are killed

It's not just size that matters either, the relative contiguity of dimensions matters:

  • Temporal processing is enhanced by larger chunks along the time dimension
  • Spatial processing is enhanced by larger chunks along the spatial dimensions, BUT
  • Earth Observation data can be sparse spatially, if chunks are too large spatially there will be a lot of empty chunks

Thankfully it is possible to re-chunk data for different stages of computation. Whilst re-chunking is an expensive operation the efficiency gains for downstream computation can be very significant and sometimes are simply essential to support the numerical processing required. For example, it is often necessary to have a single chunk on the time dimension for temporal calculations.

To understand the impact of chunking choices on your code (it is very algorithm dependent) it is essential to understand both the:

  • Static impact of chunking (e.g. task count, chunk size in memory), and;
  • Dynamic impact of chunking (e.g. CPU load, thread utilisation, network communication, task count and scheduler load).

Dask provides tools for viewing all of these when you print out arrays in the notebook (static) and when viewing the various graphs in the dask dashboard (dynamic).

Our example¶

The code below will be familiar, it's the same example from previous notebooks (seasonal mean NDVI over a large area). A normalised burn ratio (NBR2) calculation has been added as well to provide some additional load to assist in making the performance differences more noticeable in various graphs. The NBR2 uses two additional bands but is effectively the same type of calculation as the NDVI (a normalised difference ratio).

The primary difference for this example is the calculation (both NDVI and NBR) is performed 4 times, each with a different chunking regime. See the chunk_settings list.

When running this notebook, be sure to have the dask dashboard open and preferably visible as calculations proceed.

There are several sections to pay attention too:

  • Status
    • Short term snapshot of the Memory use (total and per worker - also shows Managed, Unmanaged and Spilled to Disk splits), Processing and CPU usage (change the Tab to switch between them)
    • Progress of the optimized Task graph
    • Near term Task Stream (Red is comms, White space is "doing nothing", other colours mostly match the tasks and you can hover over them with the mouse to get more information)
  • Tasks
    • Longer term Task Stream. This is a more comprehensive and accurate view of the Execution over time
  • System
    • Scheduler CPU, Memory and Communications load. You can zoom the graphs out using the control to get a longer term view.
  • Groups
    • High level view of the Task Graph Groups and their execution. The actual task graph is too detailed to display so this provides some insight into how high level aspects of your algorithm are executing.

All of these graphs are dynamic and should be interpreted over time.

The dask scheduler itself is also dynamic and as your code executes it stores information about how the tasks are executing and the communication occuring and adjusts scheduling accordingly. It can take a few minutes for the scheduler to settle into a true pattern. That pattern may also change, particularly in latter parts of a computation when work is completing and there are fewer tasks to execute.

Yes, that is a LOT of information. Thankfully you don't necessarily need to learn it all at once. In time, reading the information available will become easier as will knowing what to do about it.

Now let's run this notebook, remember to watch the execution in the Dask Dashboard.

Tip: It's likely you will want to repeat the calculation in this notebook several times. Because the results are persisted to the cluster simply calling it again will result in no execution (none is required because it was persisted). Rather than doing cluster.shutdown() and creating a new cluster each time you can clear the persisted result by performing a client.restart(). This will clear out all previous calculations so you can persist again. You can do this either by creating a new cell or using a Python Console for this Notebook (right click on the notebook and select New Console for Notebook).

Create a cluster¶

A modest cluster will do... and Open the dashboard

In [12]:
# Initialize the Gateway client
from dask.distributed import Client
from dask_gateway import Gateway

number_of_workers = 5 

gateway = Gateway()

clusters = gateway.list_clusters()
if not clusters:
    print('Creating new cluster. Please wait for this to finish.')
    cluster = gateway.new_cluster()
else:
    print(f'An existing cluster was found. Connecting to: {clusters[0].name}')
    cluster=gateway.connect(clusters[0].name)

cluster.scale(number_of_workers)

client = cluster.get_client()
client
An existing cluster was found. Connecting to: easihub.c971161508704ad3ad61c88a024a531c
Out[12]:

Client

Client-c81310bc-0672-11ee-8a47-8e60251f35b2

Connection method: Cluster object Cluster type: dask_gateway.GatewayCluster
Dashboard: https://hub.adias.aquawatchaus.space/services/dask-gateway/clusters/easihub.c971161508704ad3ad61c88a024a531c/status

Cluster Info

GatewayCluster

  • Name: easihub.c971161508704ad3ad61c88a024a531c
  • Dashboard: https://hub.adias.aquawatchaus.space/services/dask-gateway/clusters/easihub.c971161508704ad3ad61c88a024a531c/status

Setup all our functions and query parameters¶

Nothing special here

In [2]:
import pyproj
pyproj.set_use_global_context(True)

import git
import sys, os
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
from dask.distributed import Client, LocalCluster, wait
import datacube
from datacube.utils import masking
from datacube.utils.aws import configure_s3_access

# EASI defaults
os.environ['USE_PYGEOS'] = '0'
repo = git.Repo('.', search_parent_directories=True).working_tree_dir
if repo not in sys.path: sys.path.append(repo)
from easi_tools import EasiDefaults, notebook_utils
easi = EasiDefaults()
Successfully found configuration for deployment "adias"
In [3]:
dc = datacube.Datacube()
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client)
Out[3]:
<botocore.credentials.Credentials at 0x7fa4047e0b50>
In [4]:
# Get the centroid of the coordinates of the default extents
central_lat = sum(easi.latitude)/2
central_lon = sum(easi.longitude)/2
# central_lat = -42.019
# central_lon = 146.615

# Set the buffer to load around the central coordinates
# This is a radial distance for the bbox to actual area so bbox 2x buffer in both dimensions
buffer = 1

# Compute the bounding box for the study area
study_area_lat = (central_lat - buffer, central_lat + buffer)
study_area_lon = (central_lon - buffer, central_lon + buffer)

# Data product
product = easi.product('landsat')

# Set the date range to load data over
set_time = easi.time
set_time = (set_time[0], parse(set_time[0]) + relativedelta(months=6))
#set_time = ("2021-07-01", "2021-12-31")

# Selected measurement names (used in this notebook)
alias = easi.aliases('landsat')
measurements = [alias[x] for x in ['qa_band', 'red', 'nir', 'swir1', 'swir2']]

# Set the QA band name and mask values
qa_band = alias['qa_band']
qa_mask = easi.qa_mask('landsat')

# Set the resampling method for the bands
resampling = {qa_band: "nearest", "*": "average"}

# Set the coordinate reference system and output resolution
set_crs = easi.crs('landsat')  # If defined, else None
set_resolution = easi.resolution('landsat')  # If defined, else None
# set_crs = "epsg:3577"
# set_resolution = (-30, 30)

# Set the scene group_by method
group_by = "solar_day"
In [5]:
def calc_ndvi(dataset):
    # Calculate the components that make up the NDVI calculation
    band_diff = dataset[alias['nir']] - dataset[alias['red']]
    band_sum = dataset[alias['nir']] + dataset[alias['red']]
    # Calculate NDVI
    ndvi = band_diff / band_sum
    return ndvi

def calc_nbr2(dataset):
    # Calculate the components that make up the NDVI calculation
    band_diff = dataset[alias['swir1']] - dataset[alias['swir2']]
    band_sum = dataset[alias['swir1']] + dataset[alias['swir2']]
    # Calculate NBR2
    nbr2 = band_diff / band_sum
    return nbr2

def mask(dataset, bands):
    # Identify pixels that are either "valid", "water" or "snow"
    cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
    # Apply the mask
    cloud_free = dataset[bands].astype('float32').where(cloud_free_mask)
    return cloud_free

def seasonal_mean(dataset):
    return dataset.resample(time="QS-DEC").mean('time') # perform the seasonal mean for each quarter

We have an array of chunk settings to trial.

  • Notice the chunk_settings are nominally the same size
  • Notice we're varying the temporal chunking from large to small and adjusting the spatial chunking to keep the overall volume similar (nominally this will be 100 Megs per chunk for the original dataset)

There are two time:1 chunks because 50 doesn't have a clean sqrt. The first is the nearest square, the second simply changes the chunks to be rectangles (no one said the spatial dimensions needed to be the same).

Given the chunk size in memory is roughly the same, the cluster the same, the calculation the same - any differences in execution are a result of the different chunking shape.

In [6]:
chunk_settings = [
    {"chunks": {"time":100, "x":300, "y":300}, "comment": "This run has small spatial chunks but each chunk has a lot of time steps. This results in many small file reads, but there are more total tasks for the scheduler to handle."},
    {"chunks": {"time":50, "x":1*300, "y":2*300}, "comment": "This second run has slightly larger spatial chunks but smaller temporal extents in each chunk. This results in fewer total tasks, but each one takes longer to load."},
    {"chunks": {"time":1, "x":10*300, "y":10*300}, "comment": "This run has only a single time step in each chunk, but large, square spatial extents. As a result, workers need to store much more data in memory and some data is spilled to disk."},
    {"chunks": {"time":1, "x":21*300, "y":5*300}, "comment": "Again this run has a single time step per chunk, but the spatial extents are rectangles."},
]

Now we can loop over all our chunk_settings and create all the required delayed task graphs. This will take a moment as the ODC database will be interogated for all the necessary dataset information.

You will notice the calculation is split up so we can see the interim results - well the last one at least given its a loop and we're overwriting them.

Different stages of computation will produce different data types and calculations and thus chunk and task counts. We may find that an interim result has a terrible chunk size (e.g. int16 data variables become float64 and thus your chunks are now 4x the size, or a dimension is reduced and chunks are too small). It is thus advisable when tuning to make it possible to view these interim stages to see the static impact.

Remember: there is a single task graph executing to provide the final result. There is no need to persist() or compute() the interim results to see their static attributes. In fact, it may be unwise to persist() as this will chew up resources on the cluster if you don't intend on using the results.

In [7]:
for chunkset in chunk_settings:
    chunks = chunkset["chunks"]
    print(chunks)
{'time': 100, 'x': 300, 'y': 300}
{'time': 50, 'x': 300, 'y': 600}
{'time': 1, 'x': 3000, 'y': 3000}
{'time': 1, 'x': 6300, 'y': 1500}
In [8]:
import numpy as np
results = []
for chunkset in chunk_settings:
    chunks = chunkset["chunks"]
    dataset = dc.load(
                product=product,
                x=study_area_lon,
                y=study_area_lat,
                time=set_time,
                measurements=measurements,
                resampling=resampling,
                output_crs=set_crs,
                resolution=set_resolution,
                dask_chunks = chunks,
                group_by=group_by,
            )
    
    num_time = dataset.sizes['time']
    time_ind = np.linspace(1, num_time, 100, dtype='int') - 1
    dataset = dataset.isel(time=time_ind) # load exactly 100 evenly spaced timesteps so that we can work more easily with different chunks

    masked_dataset = mask(dataset, [alias[x] for x in ['red', 'nir', 'swir1', 'swir2']])
    ndvi = calc_ndvi(masked_dataset)
    nbr2 = calc_nbr2(masked_dataset)
    seasonal_mean_ndvi = seasonal_mean(ndvi)
    seasonal_mean_nbr2 = seasonal_mean(nbr2)
    seasonal_mean_ndvi.name = 'ndvi'
    seasonal_mean_nbr2.name = 'nbr2'
    results.append([seasonal_mean_ndvi, seasonal_mean_nbr2])

Inspecting static information¶

Lets take a look at the vital statistics for the final iteration of the loop. All the calculations are the same, just the chunk parameters vary so we can infer easily from these what else is happening for the static parameters.

In [9]:
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
print(f"seasonal_mean_ndvi size (GiB) {seasonal_mean_ndvi.nbytes / 2**30:.2f}")
display(dataset)
dataset size (GiB) 139.85
seasonal_mean_ndvi size (GiB) 1.68
<xarray.Dataset>
Dimensions:      (time: 100, y: 12269, x: 12239)
Coordinates:
  * time         (time) datetime64[ns] 2022-02-04T18:45:24.927446 ... 2022-07...
  * y            (y) float64 1.394e+07 1.394e+07 ... 1.357e+07 1.357e+07
  * x            (x) float64 5.276e+06 5.276e+06 ... 5.644e+06 5.644e+06
    spatial_ref  int32 32650
Data variables:
    qa_pixel     (time, y, x) uint16 dask.array<chunksize=(3, 1500, 6300), meta=np.ndarray>
    red          (time, y, x) uint16 dask.array<chunksize=(3, 1500, 6300), meta=np.ndarray>
    nir08        (time, y, x) uint16 dask.array<chunksize=(3, 1500, 6300), meta=np.ndarray>
    swir16       (time, y, x) uint16 dask.array<chunksize=(3, 1500, 6300), meta=np.ndarray>
    swir22       (time, y, x) uint16 dask.array<chunksize=(3, 1500, 6300), meta=np.ndarray>
Attributes:
    crs:           epsg:32650
    grid_mapping:  spatial_ref
xarray.Dataset
    • time: 100
    • y: 12269
    • x: 12239
    • time
      (time)
      datetime64[ns]
      2022-02-04T18:45:24.927446 ... 2...
      units :
      seconds since 1970-01-01 00:00:00
      array(['2022-02-04T18:45:24.927446000', '2022-02-04T18:45:24.927446000',
             '2022-02-04T18:45:24.927446000', '2022-02-06T18:33:02.883716000',
             '2022-02-06T18:33:02.883716000', '2022-02-06T18:33:02.883716000',
             '2022-02-13T18:39:11.333089000', '2022-02-13T18:39:11.333089000',
             '2022-02-13T18:39:11.333089000', '2022-02-20T18:45:18.711102000',
             '2022-02-20T18:45:18.711102000', '2022-02-20T18:45:18.711102000',
             '2022-02-22T18:32:57.095397000', '2022-02-22T18:32:57.095397000',
             '2022-02-22T18:32:57.095397000', '2022-03-01T18:39:07.286001000',
             '2022-03-01T18:39:07.286001000', '2022-03-01T18:39:07.286001000',
             '2022-03-08T18:45:16.230878000', '2022-03-08T18:45:16.230878000',
             '2022-03-08T18:45:16.230878000', '2022-03-10T18:32:53.788942000',
             '2022-03-10T18:32:53.788942000', '2022-03-10T18:32:53.788942000',
             '2022-03-17T18:39:00.560915000', '2022-03-17T18:39:00.560915000',
             '2022-03-17T18:39:00.560915000', '2022-03-24T18:45:05.854461000',
             '2022-03-24T18:45:05.854461000', '2022-03-24T18:45:05.854461000',
             '2022-03-26T18:32:42.421339000', '2022-03-26T18:32:42.421339000',
             '2022-03-26T18:32:42.421339000', '2022-04-02T18:38:50.082814000',
             '2022-04-02T18:38:50.082814000', '2022-04-02T18:38:50.082814000',
             '2022-04-09T18:45:05.547583000', '2022-04-09T18:45:05.547583000',
             '2022-04-09T18:45:05.547583000', '2022-04-11T18:32:44.795962000',
             '2022-04-11T18:32:44.795962000', '2022-04-11T18:32:44.795962000',
             '2022-04-18T18:38:55.800390000', '2022-04-18T18:38:55.800390000',
             '2022-04-18T18:38:55.800390000', '2022-04-25T18:45:03.797750000',
             '2022-04-25T18:45:03.797750000', '2022-04-25T18:45:03.797750000',
             '2022-04-27T18:32:40.949842000', '2022-04-27T18:32:40.949842000',
             '2022-04-27T18:32:40.949842000', '2022-05-04T18:38:56.186857000',
             '2022-05-04T18:38:56.186857000', '2022-05-04T18:38:56.186857000',
             '2022-05-11T18:45:11.246398000', '2022-05-11T18:45:11.246398000',
             '2022-05-11T18:45:11.246398000', '2022-05-13T18:32:50.508016000',
             '2022-05-13T18:32:50.508016000', '2022-05-13T18:32:50.508016000',
             '2022-05-20T18:39:02.796863000', '2022-05-20T18:39:02.796863000',
             '2022-05-20T18:39:02.796863000', '2022-05-27T18:45:12.137417000',
             '2022-05-27T18:45:12.137417000', '2022-05-27T18:45:12.137417000',
             '2022-05-29T18:32:52.614553000', '2022-05-29T18:32:52.614553000',
             '2022-05-29T18:32:52.614553000', '2022-06-05T18:39:09.842623000',
             '2022-06-05T18:39:09.842623000', '2022-06-05T18:39:09.842623000',
             '2022-06-12T18:45:25.842513000', '2022-06-12T18:45:25.842513000',
             '2022-06-12T18:45:25.842513000', '2022-06-14T18:33:05.421814000',
             '2022-06-14T18:33:05.421814000', '2022-06-14T18:33:05.421814000',
             '2022-06-21T18:39:19.245159000', '2022-06-21T18:39:19.245159000',
             '2022-06-21T18:39:19.245159000', '2022-06-28T18:45:31.428313000',
             '2022-06-28T18:45:31.428313000', '2022-06-28T18:45:31.428313000',
             '2022-06-30T18:33:09.961213000', '2022-06-30T18:33:09.961213000',
             '2022-06-30T18:33:09.961213000', '2022-07-07T18:39:20.649104000',
             '2022-07-07T18:39:20.649104000', '2022-07-07T18:39:20.649104000',
             '2022-07-14T18:45:29.948014000', '2022-07-14T18:45:29.948014000',
             '2022-07-14T18:45:29.948014000', '2022-07-16T18:33:09.188586000',
             '2022-07-16T18:33:09.188586000', '2022-07-16T18:33:09.188586000',
             '2022-07-23T18:39:26.095342000', '2022-07-23T18:39:26.095342000',
             '2022-07-23T18:39:26.095342000', '2022-07-30T18:45:41.616685000'],
            dtype='datetime64[ns]')
    • y
      (y)
      float64
      1.394e+07 1.394e+07 ... 1.357e+07
      units :
      metre
      resolution :
      -30.0
      crs :
      epsg:32650
      array([13939425., 13939395., 13939365., ..., 13571445., 13571415., 13571385.])
    • x
      (x)
      float64
      5.276e+06 5.276e+06 ... 5.644e+06
      units :
      metre
      resolution :
      30.0
      crs :
      epsg:32650
      array([5276445., 5276475., 5276505., ..., 5643525., 5643555., 5643585.])
    • spatial_ref
      ()
      int32
      32650
      spatial_ref :
      PROJCS["WGS 84 / UTM zone 50N",GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],AUTHORITY["EPSG","4326"]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",0],PARAMETER["central_meridian",117],PARAMETER["scale_factor",0.9996],PARAMETER["false_easting",500000],PARAMETER["false_northing",0],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH],AUTHORITY["EPSG","32650"]]
      grid_mapping_name :
      transverse_mercator
      array(32650, dtype=int32)
    • qa_pixel
      (time, y, x)
      uint16
      dask.array<chunksize=(3, 1500, 6300), meta=np.ndarray>
      units :
      bit_index
      nodata :
      1
      flags_definition :
      {'snow': {'bits': 5, 'values': {'0': 'not_high_confidence', '1': 'high_confidence'}}, 'clear': {'bits': 6, 'values': {'0': 'not_clear', '1': 'clear'}}, 'cloud': {'bits': 3, 'values': {'0': 'not_high_confidence', '1': 'high_confidence'}}, 'water': {'bits': 7, 'values': {'0': 'land_or_cloud', '1': 'water'}}, 'cirrus': {'bits': 2, 'values': {'0': 'not_high_confidence', '1': 'high_confidence'}}, 'nodata': {'bits': 0, 'values': {'0': False, '1': True}}, 'qa_pixel': {'bits': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], 'values': {'1': 'Fill', '2': 'Dilated Cloud', '4': 'Cirrus', '8': 'Cloud', '16': 'Cloud Shadow', '32': 'Snow', '64': 'Clear', '128': 'Water', '256': 'Cloud Confidence low bit', '512': 'Cloud Confidence high bit', '1024': 'Cloud Shadow Confidence low bit', '2048': 'Cloud Shadow Confidence high bit', '4096': 'Snow Ice Confidence low bit', '8192': 'Snow Ice Confidence high bit', '16384': 'Cirrus Confidence low bit', '32768': 'Cirrus Confidence high bit'}, 'description': 'Level 2 pixel quality'}, 'cloud_shadow': {'bits': 4, 'values': {'0': 'not_high_confidence', '1': 'high_confidence'}}, 'dilated_cloud': {'bits': 1, 'values': {'0': 'not_dilated', '1': 'dilated'}}, 'cloud_confidence': {'bits': [8, 9], 'values': {'0': 'none', '1': 'low', '2': 'medium', '3': 'high'}}, 'cirrus_confidence': {'bits': [14, 15], 'values': {'0': 'none', '1': 'low', '2': 'reserved', '3': 'high'}}, 'snow_ice_confidence': {'bits': [12, 13], 'values': {'0': 'none', '1': 'low', '2': 'reserved', '3': 'high'}}, 'cloud_shadow_confidence': {'bits': [10, 11], 'values': {'0': 'none', '1': 'low', '2': 'reserved', '3': 'high'}}}
      crs :
      epsg:32650
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 27.97 GiB 54.07 MiB
      Shape (100, 12269, 12239) (3, 1500, 6300)
      Dask graph 612 chunks in 2 graph layers
      Data type uint16 numpy.ndarray
      12239 12269 100
    • red
      (time, y, x)
      uint16
      dask.array<chunksize=(3, 1500, 6300), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32650
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 27.97 GiB 54.07 MiB
      Shape (100, 12269, 12239) (3, 1500, 6300)
      Dask graph 612 chunks in 2 graph layers
      Data type uint16 numpy.ndarray
      12239 12269 100
    • nir08
      (time, y, x)
      uint16
      dask.array<chunksize=(3, 1500, 6300), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32650
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 27.97 GiB 54.07 MiB
      Shape (100, 12269, 12239) (3, 1500, 6300)
      Dask graph 612 chunks in 2 graph layers
      Data type uint16 numpy.ndarray
      12239 12269 100
    • swir16
      (time, y, x)
      uint16
      dask.array<chunksize=(3, 1500, 6300), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32650
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 27.97 GiB 54.07 MiB
      Shape (100, 12269, 12239) (3, 1500, 6300)
      Dask graph 612 chunks in 2 graph layers
      Data type uint16 numpy.ndarray
      12239 12269 100
    • swir22
      (time, y, x)
      uint16
      dask.array<chunksize=(3, 1500, 6300), meta=np.ndarray>
      units :
      reflectance
      nodata :
      0
      crs :
      epsg:32650
      grid_mapping :
      spatial_ref
      Array Chunk
      Bytes 27.97 GiB 54.07 MiB
      Shape (100, 12269, 12239) (3, 1500, 6300)
      Dask graph 612 chunks in 2 graph layers
      Data type uint16 numpy.ndarray
      12239 12269 100
    • time
      PandasIndex
      PandasIndex(DatetimeIndex(['2022-02-04 18:45:24.927446', '2022-02-04 18:45:24.927446',
                     '2022-02-04 18:45:24.927446', '2022-02-06 18:33:02.883716',
                     '2022-02-06 18:33:02.883716', '2022-02-06 18:33:02.883716',
                     '2022-02-13 18:39:11.333089', '2022-02-13 18:39:11.333089',
                     '2022-02-13 18:39:11.333089', '2022-02-20 18:45:18.711102',
                     '2022-02-20 18:45:18.711102', '2022-02-20 18:45:18.711102',
                     '2022-02-22 18:32:57.095397', '2022-02-22 18:32:57.095397',
                     '2022-02-22 18:32:57.095397', '2022-03-01 18:39:07.286001',
                     '2022-03-01 18:39:07.286001', '2022-03-01 18:39:07.286001',
                     '2022-03-08 18:45:16.230878', '2022-03-08 18:45:16.230878',
                     '2022-03-08 18:45:16.230878', '2022-03-10 18:32:53.788942',
                     '2022-03-10 18:32:53.788942', '2022-03-10 18:32:53.788942',
                     '2022-03-17 18:39:00.560915', '2022-03-17 18:39:00.560915',
                     '2022-03-17 18:39:00.560915', '2022-03-24 18:45:05.854461',
                     '2022-03-24 18:45:05.854461', '2022-03-24 18:45:05.854461',
                     '2022-03-26 18:32:42.421339', '2022-03-26 18:32:42.421339',
                     '2022-03-26 18:32:42.421339', '2022-04-02 18:38:50.082814',
                     '2022-04-02 18:38:50.082814', '2022-04-02 18:38:50.082814',
                     '2022-04-09 18:45:05.547583', '2022-04-09 18:45:05.547583',
                     '2022-04-09 18:45:05.547583', '2022-04-11 18:32:44.795962',
                     '2022-04-11 18:32:44.795962', '2022-04-11 18:32:44.795962',
                     '2022-04-18 18:38:55.800390', '2022-04-18 18:38:55.800390',
                     '2022-04-18 18:38:55.800390', '2022-04-25 18:45:03.797750',
                     '2022-04-25 18:45:03.797750', '2022-04-25 18:45:03.797750',
                     '2022-04-27 18:32:40.949842', '2022-04-27 18:32:40.949842',
                     '2022-04-27 18:32:40.949842', '2022-05-04 18:38:56.186857',
                     '2022-05-04 18:38:56.186857', '2022-05-04 18:38:56.186857',
                     '2022-05-11 18:45:11.246398', '2022-05-11 18:45:11.246398',
                     '2022-05-11 18:45:11.246398', '2022-05-13 18:32:50.508016',
                     '2022-05-13 18:32:50.508016', '2022-05-13 18:32:50.508016',
                     '2022-05-20 18:39:02.796863', '2022-05-20 18:39:02.796863',
                     '2022-05-20 18:39:02.796863', '2022-05-27 18:45:12.137417',
                     '2022-05-27 18:45:12.137417', '2022-05-27 18:45:12.137417',
                     '2022-05-29 18:32:52.614553', '2022-05-29 18:32:52.614553',
                     '2022-05-29 18:32:52.614553', '2022-06-05 18:39:09.842623',
                     '2022-06-05 18:39:09.842623', '2022-06-05 18:39:09.842623',
                     '2022-06-12 18:45:25.842513', '2022-06-12 18:45:25.842513',
                     '2022-06-12 18:45:25.842513', '2022-06-14 18:33:05.421814',
                     '2022-06-14 18:33:05.421814', '2022-06-14 18:33:05.421814',
                     '2022-06-21 18:39:19.245159', '2022-06-21 18:39:19.245159',
                     '2022-06-21 18:39:19.245159', '2022-06-28 18:45:31.428313',
                     '2022-06-28 18:45:31.428313', '2022-06-28 18:45:31.428313',
                     '2022-06-30 18:33:09.961213', '2022-06-30 18:33:09.961213',
                     '2022-06-30 18:33:09.961213', '2022-07-07 18:39:20.649104',
                     '2022-07-07 18:39:20.649104', '2022-07-07 18:39:20.649104',
                     '2022-07-14 18:45:29.948014', '2022-07-14 18:45:29.948014',
                     '2022-07-14 18:45:29.948014', '2022-07-16 18:33:09.188586',
                     '2022-07-16 18:33:09.188586', '2022-07-16 18:33:09.188586',
                     '2022-07-23 18:39:26.095342', '2022-07-23 18:39:26.095342',
                     '2022-07-23 18:39:26.095342', '2022-07-30 18:45:41.616685'],
                    dtype='datetime64[ns]', name='time', freq=None))
    • y
      PandasIndex
      PandasIndex(Float64Index([13939425.0, 13939395.0, 13939365.0, 13939335.0, 13939305.0,
                    13939275.0, 13939245.0, 13939215.0, 13939185.0, 13939155.0,
                    ...
                    13571655.0, 13571625.0, 13571595.0, 13571565.0, 13571535.0,
                    13571505.0, 13571475.0, 13571445.0, 13571415.0, 13571385.0],
                   dtype='float64', name='y', length=12269))
    • x
      PandasIndex
      PandasIndex(Float64Index([5276445.0, 5276475.0, 5276505.0, 5276535.0, 5276565.0, 5276595.0,
                    5276625.0, 5276655.0, 5276685.0, 5276715.0,
                    ...
                    5643315.0, 5643345.0, 5643375.0, 5643405.0, 5643435.0, 5643465.0,
                    5643495.0, 5643525.0, 5643555.0, 5643585.0],
                   dtype='float64', name='x', length=12239))
  • crs :
    epsg:32650
    grid_mapping :
    spatial_ref

So the source dataset is 150 GB in size - mostly int16 data type. We need to be mindful that our calculation will convert these to floats. The code above does an explicit type conversion to float32 which can fully represent an int16. Without the explicit type conversion, Python would use float64 resulting in double the memory usage for no good reason (for this algorithm).

Open the cylinder to show the red dask array details. The chunk is about 100 MiB in size. Generally this is a healthy size though it can be larger and may need to be smaller depending on the calculation involved and communication between workers.

Now let's look at the results for the NDVI and NBR:

In [10]:
display(results[0][0])
display(results[0][1])
<xarray.DataArray 'ndvi' (time: 3, y: 12269, x: 12239)>
dask.array<stack, shape=(3, 12269, 12239), dtype=float32, chunksize=(1, 300, 300), chunktype=numpy.ndarray>
Coordinates:
  * y            (y) float64 1.394e+07 1.394e+07 ... 1.357e+07 1.357e+07
  * x            (x) float64 5.276e+06 5.276e+06 ... 5.644e+06 5.644e+06
    spatial_ref  int32 32650
  * time         (time) datetime64[ns] 2021-12-01 2022-03-01 2022-06-01
xarray.DataArray
'ndvi'
  • time: 3
  • y: 12269
  • x: 12239
  • dask.array<chunksize=(1, 300, 300), meta=np.ndarray>
    Array Chunk
    Bytes 1.68 GiB 351.56 kiB
    Shape (3, 12269, 12239) (1, 300, 300)
    Dask graph 5043 chunks in 28 graph layers
    Data type float32 numpy.ndarray
    12239 12269 3
    • y
      (y)
      float64
      1.394e+07 1.394e+07 ... 1.357e+07
      units :
      metre
      resolution :
      -30.0
      crs :
      epsg:32650
      array([13939425., 13939395., 13939365., ..., 13571445., 13571415., 13571385.])
    • x
      (x)
      float64
      5.276e+06 5.276e+06 ... 5.644e+06
      units :
      metre
      resolution :
      30.0
      crs :
      epsg:32650
      array([5276445., 5276475., 5276505., ..., 5643525., 5643555., 5643585.])
    • spatial_ref
      ()
      int32
      32650
      spatial_ref :
      PROJCS["WGS 84 / UTM zone 50N",GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],AUTHORITY["EPSG","4326"]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",0],PARAMETER["central_meridian",117],PARAMETER["scale_factor",0.9996],PARAMETER["false_easting",500000],PARAMETER["false_northing",0],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH],AUTHORITY["EPSG","32650"]]
      grid_mapping_name :
      transverse_mercator
      array(32650, dtype=int32)
    • time
      (time)
      datetime64[ns]
      2021-12-01 2022-03-01 2022-06-01
      array(['2021-12-01T00:00:00.000000000', '2022-03-01T00:00:00.000000000',
             '2022-06-01T00:00:00.000000000'], dtype='datetime64[ns]')
    • y
      PandasIndex
      PandasIndex(Float64Index([13939425.0, 13939395.0, 13939365.0, 13939335.0, 13939305.0,
                    13939275.0, 13939245.0, 13939215.0, 13939185.0, 13939155.0,
                    ...
                    13571655.0, 13571625.0, 13571595.0, 13571565.0, 13571535.0,
                    13571505.0, 13571475.0, 13571445.0, 13571415.0, 13571385.0],
                   dtype='float64', name='y', length=12269))
    • x
      PandasIndex
      PandasIndex(Float64Index([5276445.0, 5276475.0, 5276505.0, 5276535.0, 5276565.0, 5276595.0,
                    5276625.0, 5276655.0, 5276685.0, 5276715.0,
                    ...
                    5643315.0, 5643345.0, 5643375.0, 5643405.0, 5643435.0, 5643465.0,
                    5643495.0, 5643525.0, 5643555.0, 5643585.0],
                   dtype='float64', name='x', length=12239))
    • time
      PandasIndex
      PandasIndex(DatetimeIndex(['2021-12-01', '2022-03-01', '2022-06-01'], dtype='datetime64[ns]', name='time', freq='QS-DEC'))
<xarray.DataArray 'nbr2' (time: 3, y: 12269, x: 12239)>
dask.array<stack, shape=(3, 12269, 12239), dtype=float32, chunksize=(1, 300, 300), chunktype=numpy.ndarray>
Coordinates:
  * y            (y) float64 1.394e+07 1.394e+07 ... 1.357e+07 1.357e+07
  * x            (x) float64 5.276e+06 5.276e+06 ... 5.644e+06 5.644e+06
    spatial_ref  int32 32650
  * time         (time) datetime64[ns] 2021-12-01 2022-03-01 2022-06-01
xarray.DataArray
'nbr2'
  • time: 3
  • y: 12269
  • x: 12239
  • dask.array<chunksize=(1, 300, 300), meta=np.ndarray>
    Array Chunk
    Bytes 1.68 GiB 351.56 kiB
    Shape (3, 12269, 12239) (1, 300, 300)
    Dask graph 5043 chunks in 28 graph layers
    Data type float32 numpy.ndarray
    12239 12269 3
    • y
      (y)
      float64
      1.394e+07 1.394e+07 ... 1.357e+07
      units :
      metre
      resolution :
      -30.0
      crs :
      epsg:32650
      array([13939425., 13939395., 13939365., ..., 13571445., 13571415., 13571385.])
    • x
      (x)
      float64
      5.276e+06 5.276e+06 ... 5.644e+06
      units :
      metre
      resolution :
      30.0
      crs :
      epsg:32650
      array([5276445., 5276475., 5276505., ..., 5643525., 5643555., 5643585.])
    • spatial_ref
      ()
      int32
      32650
      spatial_ref :
      PROJCS["WGS 84 / UTM zone 50N",GEOGCS["WGS 84",DATUM["WGS_1984",SPHEROID["WGS 84",6378137,298.257223563,AUTHORITY["EPSG","7030"]],AUTHORITY["EPSG","6326"]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]],AUTHORITY["EPSG","4326"]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",0],PARAMETER["central_meridian",117],PARAMETER["scale_factor",0.9996],PARAMETER["false_easting",500000],PARAMETER["false_northing",0],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH],AUTHORITY["EPSG","32650"]]
      grid_mapping_name :
      transverse_mercator
      array(32650, dtype=int32)
    • time
      (time)
      datetime64[ns]
      2021-12-01 2022-03-01 2022-06-01
      array(['2021-12-01T00:00:00.000000000', '2022-03-01T00:00:00.000000000',
             '2022-06-01T00:00:00.000000000'], dtype='datetime64[ns]')
    • y
      PandasIndex
      PandasIndex(Float64Index([13939425.0, 13939395.0, 13939365.0, 13939335.0, 13939305.0,
                    13939275.0, 13939245.0, 13939215.0, 13939185.0, 13939155.0,
                    ...
                    13571655.0, 13571625.0, 13571595.0, 13571565.0, 13571535.0,
                    13571505.0, 13571475.0, 13571445.0, 13571415.0, 13571385.0],
                   dtype='float64', name='y', length=12269))
    • x
      PandasIndex
      PandasIndex(Float64Index([5276445.0, 5276475.0, 5276505.0, 5276535.0, 5276565.0, 5276595.0,
                    5276625.0, 5276655.0, 5276685.0, 5276715.0,
                    ...
                    5643315.0, 5643345.0, 5643375.0, 5643405.0, 5643435.0, 5643465.0,
                    5643495.0, 5643525.0, 5643555.0, 5643585.0],
                   dtype='float64', name='x', length=12239))
    • time
      PandasIndex
      PandasIndex(DatetimeIndex(['2021-12-01', '2022-03-01', '2022-06-01'], dtype='datetime64[ns]', name='time', freq='QS-DEC'))

Notice the result is much smaller in chunk size - 4 MiB. This is due to the seasonal mean. This may have an impact on downstream usage of the result as the chunks may be too small and result in too many tasks reducing later processing performance.

Notice also the Task count. With both results we're pushing towards 100_000 tasks in the scheduler depending on task graph optimisation. The Scheduler has its own overheads (about 1ms per active task, and memory usage for tracking all tasks, including executed ones as it keeps the history in case it needs to reproduce the results e.g. if a worker is lost). Again, it is possible to have more than 100_000 tasks and be efficient depending on your algorithm but its something to keep an eye on. We will be below it in this case (especially after optimisation).

Persist the results¶

Theoretically we could persist all of the results at once - though we would be well above the 100_000 task limit if we did. More importantly we actually want to see the difference in the dynamics of the execution. The loop below will persist each result one at a time and wait() for it to be complete.

You should monitor execution in the Dask Dashboard

Look at the various tabs as execution proceeds. you will notice differences in memory per worker, Communication between workers (red bars in the Task Stream), white space (idle time), and CPU utilisation (remember to click on the CPU tab to get to this detail). The Tasks section of the dashboard is particularly useful at looking at a comparison of all four runs' dynamics as the length of all calculations means this snapshot still show all four blocks of computation at once.

Don't forget, if you want to run the code again use client.restart() to clear out the previous results from the cluster.

Tip: If you leave your computer while this step is running, make sure that it doesn't go to sleep by adjusting your power settings.

In [11]:
client.wait_for_workers(n_workers=number_of_workers)

for i, result in enumerate(results):
    print(f'Run number {i+1}:')
    print(f'Chunks: {chunk_settings[i]["chunks"]}')
    print(chunk_settings[i]["comment"])
    client.restart()
    f = client.persist(result)
    %time wait(f)
    client.restart() # clearing the cluster out so each run it cleanly separated
    print()
Run number 1:
Chunks: {'time': 100, 'x': 300, 'y': 300}
This run has small spatial chunks but each chunk has a lot of time steps. This results in many small file reads, but there are more total tasks for the scheduler to handle.
CPU times: user 7.97 s, sys: 493 ms, total: 8.47 s
Wall time: 11min 14s

Run number 2:
Chunks: {'time': 50, 'x': 300, 'y': 600}
This second run has slightly larger spatial chunks but smaller temporal extents in each chunk. This results in fewer total tasks, but each one takes longer to load.
CPU times: user 4.38 s, sys: 233 ms, total: 4.61 s
Wall time: 6min 29s

Run number 3:
Chunks: {'time': 1, 'x': 3000, 'y': 3000}
This run has only a single time step in each chunk, but large, square spatial extents. As a result, workers need to store much more data in memory and some data is spilled to disk.
CPU times: user 1.22 s, sys: 62.1 ms, total: 1.28 s
Wall time: 8min 43s

Run number 4:
Chunks: {'time': 1, 'x': 6300, 'y': 1500}
Again this run has a single time step per chunk, but the spatial extents are rectangles.
CPU times: user 1.05 s, sys: 53.1 ms, total: 1.1 s
Wall time: 9min 39s

Understanding the dynamics¶

Be a good dask user - Clean up the cluster resources¶

Disconnecting your client is good practice, but the cluster will still be up so we need to shut it down as well

In [ ]:
client.close()

cluster.shutdown()
In [ ]: